Skip to main content

AMQP Protocol - Network Layer Protocol

AMQP is a binary protocol. Everything is organized into packet frames, and those frames carries informations about methods, classes and more.

Each AMQP Frame consists of:

  1. 0th Uint8: -> Type of packet
    1. 1: Method
    2. 2: Header
    3. 3: Body
    4. 8: Heartbeat
  2. 1th Uint16 -> Channel ID - Usually a integer between 1 and 256
  3. 3th Uint32 -> FrameSize: The byte length for the entire frame
  4. Framesize + 3th -> The Frame End: A Value to identify the frameEnd -> always 206

To write a client library for AMQP all clients libraries start the connection while a persistent TCP connection. In this example, we are using NodeJS.

AMQP protocol is a binary implementation of RPC calls. The communication between client-server (client-broker) is made by formatting a TCP packet with some binary format - that encapsulates method calls - having the method name, channel/connection which it was invoked and the arguments of it.

AMQP DataTypes

The AMQP frames supports the following data types

  1. Integers -> 1 to 8 octets
  2. Bits -> Single octets : represents ON/OFF values. Boolean
  3. Short strings -> Single octet w/ string values ranging up until 255 octets
    1. it does fulfill 2 spaces, since we need the length of it also.
  4. LongStrings -> holds binary data?
  5. Table -> key-value pairs where value is one of the DataTypes

AMQP Frames

All frames consist of a headers - which is 7 octets, a body and a FrameEnd octet. So each time we parse these frames, we need to read the headers - which will give us the packet size then ensure the last octet - FrameEnd - has value of 206.

When implementing the protocol and performance is a concern we should use ReadAheadBuffering & Gathering Reads to avoid constant calls to SO.

Each frame should also inform the PAYLOAD size, which is basically the Method payload.

Frame-End

The FrameEnd is used "to detect framing errors caused by incorrect client or server implementations." These would case connection being closed.

The frame-end octet MUST always be the hexadecimal value %xCE

AMQP Method Frames

To process a method frame, we should read, separately, the first two octet which will give us, in order, the classId and the methodId.

octets

Establishing a Connection

To establish a connection with AMQP, the client needs to connect via TCP Socket and emit a packet telling the AMQP service which AMQP version we are talking to. This packet is called the Protocol Headers and should contain the following buffer "AMQP0{version}" as bytes

const net = require('net');

//This should start the entire RPC flow to connect to AMQP
const connection = net.connect(5672, 'localhost', () => {
const packet = Buffer.from(`AMQP${String.fromCharCode(0,0,9,1)}`);
connection.write(packet);
})

Receiving Frames and Packets

As soon as the AMQP broker receives the above packets, it will start communicating while establishing connections, the way we handle it is a little bit different than usual packets, since AMQP uses Channels, the connection channel Id Is always 0.

Following the above guidelines, we can start reading the received frame.

connection.on('data', (buffer) => {
const view = new DataView(data.buffer);
const type = view.getUint8(0); //0th is the packet type
const channelId = view.getUint16(1); //1th is the ChannelID - 1 to 256
const frameSize = view.getUint32(3); //3th is the FrameSize
const frameEnd = view.getUint8(7 + frameSize) //7th+FrameSize=FrameEnd

if(frameEnd !== 206){
//FrameEnd values should always be 206
throw new Error('Frame end invalid');
}

if(type === 1){ //If type is Method
const classId = view.getUint16(7);
const methodId = view.getUint16(7 + 2);
//If classId is 10(Connection) and methodId is 10(Start)
//it means we received a Connection#Start RPC
if(classId === 10 && methodId === 10){
console.log(`We are ready to reply with Connection#StartOk`);
}
}
})